package taskprocess

import (
	"fmt"
	"sync"
	"time"

	"git.oschina.net/chaos.su/go-nsq"
	"gopkg.in/vmihailenco/msgpack.v2"
)

// AppPatchNotifier notify the client with task messages.
type AppPatchNotifier struct {
	sync.RWMutex
	stopChan   chan bool
	cocurrency uint32
}

// HandleMessage make AppPatchNotier to immplement the nsq message handler and handle task messages.
func (apn *AppPatchNotifier) HandleMessage(msg *nsq.Message) error {
	var err error
	defer func() {
		if err == nil {
			msg.Finish()
			return
		}
		msg.RequeueWithoutBackoff(time.Second * 60)
		logger.Warnf("message handle error: %v: message: %s", err, msg.Body)

	}()
	msg.DisableAutoResponse()
	taskRaw := new(TaskInQueue)
	err = msgpack.Unmarshal(msg.Body, taskRaw)
	if err != nil {
		err = fmt.Errorf("Unable to decode task structure: %v", err)
		return err
	}
	switch taskRaw.Type {
	case TaskTypePatch:
		patchTask := new(PatchTask)
		err = msgpack.Unmarshal(taskRaw.RawBytes, patchTask)
		if err != nil {
			err = fmt.Errorf("Unable to decode patch task info: %v", err)
		} else {
			logger.Debugf("Got patch task: %+v", *patchTask)
			err = HandlePatchUpdate(patchTask)
		}

	case TaskTypeSchema:
		schemaTask := new(SchemaTask)
		err = msgpack.Unmarshal(taskRaw.RawBytes, schemaTask)
		if err != nil {
			err = fmt.Errorf("Unable to decode schema task info: %v, %v", err, *schemaTask)
		} else {
			logger.Debugf("Got schema task: %+v", *schemaTask)
			err = HandleSchemaUpdate(schemaTask)
		}
	case TaskTypeEvent:
		eventTask := new(EventTask)
		err = msgpack.Unmarshal(taskRaw.RawBytes, eventTask)
		if err != nil {
			err = fmt.Errorf("Unable to decode event task info: %v, %v", err, *eventTask)
		} else {
			logger.Debugf("Got event task: %+v", *eventTask)
			err = HandleEvent(eventTask)
		}
	default:
		err = fmt.Errorf("Unknown task type: %v", taskRaw.Type)
	}
	return err
}

// ShouldStop return the stop signal chan from which to read the stop signal.
func (apn *AppPatchNotifier) ShouldStop() <-chan bool {
	return apn.stopChan
}

// Shutdown shutdown the processor completely, receive nomore tasks.
func (apn *AppPatchNotifier) Shutdown() {
	close(apn.stopChan)
}

// StopN sends as many as n stop signals to the the stopChan.
func (apn *AppPatchNotifier) StopN(n uint32) {
	apn.Lock()
	defer apn.Unlock()
	for n > 0 && apn.cocurrency > 0 {
		apn.stopChan <- true
		apn.cocurrency--
		n--
	}
}
